package com.innovation.ic.b1b.framework.thread;

import com.alibaba.fastjson.JSONObject;
import com.innovation.ic.b1b.framework.manager.ZookeeperManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import javax.annotation.Resource;

@Slf4j
public abstract class AbstractUnlockThread {

    @Resource
    protected ZookeeperManager zookeeperManager;

    /**
     * 线程的具体操作放在这个方法里
     */
    protected abstract void doWork();

    /**
     * 在线程类的run方法中调用
     * @param consumer
     */
    protected void doInRunMethod(ConsumerRecord<?, ?> consumer){
        this.printConsumerContent(consumer);

        this.doWork();

        String zookeeperNode = this.getZookeeperNode(consumer);
        try {
            zookeeperManager.deleteNodeIfExists("/sc-dev/table/inventory/" + zookeeperNode);
        } catch (Exception e) {
            log.warn(e.toString());
        }
        log.debug("删除/sc-dev/table/inventory/{}", zookeeperNode);
    }

    /**
     * 打印监听到的数据库变动内容
     * @param consumer consumer
     */
    private void printConsumerContent(ConsumerRecord<?, ?> consumer) {
        log.debug("topic【" + consumer.topic() + "】，key【" + consumer.key() + "】，" +
                "分区位置【" + consumer.partition() + "】，下标【" + consumer.offset() + "】，" +
                "value【" + consumer.value() + "】");
    }

    /**
     * 从数据库变动内容中，获取到zookeeper_node列的值
     * @param consumer
     * @return
     */
    private String getZookeeperNode(ConsumerRecord<?, ?> consumer){
        String value = (String) consumer.value();
        JSONObject valueJSONObject = JSONObject.parseObject(value);
        String zookeeperNode = valueJSONObject.getJSONArray("data").getJSONObject(0).getString("zookeeper_node");
        log.debug("zookeeper_node【{}】", zookeeperNode);
        return zookeeperNode;
    }
}
